package org.snake.nebulae.core.task;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.snake.nebulae.core.common.DefaultMqttFactory;
import org.snake.nebulae.core.exception.RequestException;
import org.snake.nebulae.core.model.MqttFailureHolder;
import org.snake.nebulae.core.model.RequestInfo;
import org.snake.nebulae.core.model.ResponseInfo;
import org.snake.nebulae.core.model.ServiceInfo;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

@Component
@Slf4j
public class CoreRequestTask implements Runnable {

    // 必要参数
    @Resource
    private DefaultMqttFactory defaultMqttFactory;

    private MqttAsyncClient mqttAsyncClient;

    @Resource
    private ObjectMapper objectMapper;

    // 完成步骤
    /**
     * 完成服务列表获取操作
     */
    protected CompletableFuture<ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentLinkedQueue<ServiceInfo>>>> start = new CompletableFuture<>();

    /**
     * 执行调用结果处理
     */
    protected CompletableFuture<ResponseInfo> end = new CompletableFuture<>();

    /**
     * 成功完成 订阅服务主题拉取
     */
    protected CompletableFuture<IMqttToken> doFinishedSubscribeServicePullTopic = new CompletableFuture<>();

    /**
     * 处理错误 订阅服务主题拉取
     */
    protected CompletableFuture<MqttFailureHolder> doErrorSubscribeServicePullTopic = new CompletableFuture<>();

    /**
     * 成功完成 发送请求成功
     */
    protected CompletableFuture<IMqttToken> doPostRequestSuccess = new CompletableFuture<>();

    /**
     * 处理失败 发送请求失败
     */
    protected CompletableFuture<MqttFailureHolder> doPostRequestError = new CompletableFuture<>();

    /**
     * 获取结果之前
     */
    protected CompletableFuture<IMqttToken> doGetResponseBeforeSuccess = new CompletableFuture<>();

    /**
     * 获取结果之前错误
     */
    protected CompletableFuture<MqttFailureHolder> doGetResponseBeforeError = new CompletableFuture<>();

    /**
     * 获取结果成功
     */
    protected CompletableFuture<MqttFailureHolder> doGetReponseSuccess = new CompletableFuture<>();

    /**
     * 当前中心所有服务列表信息
     */
    private ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentLinkedQueue<ServiceInfo>>> serviceList = new ConcurrentHashMap<>();

    /**
     * 一次执行结果
     */
    protected ResponseInfo responseInfo;

    @PostConstruct
    private void init() {
        this.mqttAsyncClient = defaultMqttFactory.choseOneClient();
        doLogic();
    }

    protected void doLogic() {

    }

    @SneakyThrows
    @Override
    public void run() {

        // 为服务发现提供hook
        IMqttActionListener servicesPullAction = new IMqttActionListener() {
            @Override
            public void onSuccess(IMqttToken asyncActionToken) {
                log.info("{} 监听服务拉取主题完成 {} 时间是 {}", this.getClass(), CoreTask.topicServicesPull, DateUtil.now());

                doFinishedSubscribeServicePullTopic.complete(asyncActionToken);
            }

            @SneakyThrows
            @Override
            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                log.error("{} 监听服务拉取主题失败 {} 时间是 {} 原因是 {}", this.getClass(), CoreTask.topicServicesPull, DateUtil.now(), exception.getMessage());

                doErrorSubscribeServicePullTopic.complete(new MqttFailureHolder(asyncActionToken, exception));

                mqttAsyncClient.unsubscribe(CoreTask.topicServicesPull);
            }
        };

        // 为服务发现内部逻辑提供hook
        // todo 多次获取更新服务列表信息
        IMqttMessageListener servicesPullListener = (topic, message) -> {
            // 解码
            String msg = new String(message.getPayload());
            ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentLinkedQueue<ServiceInfo>>> services = objectMapper.readValue(msg, new TypeReference<ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentLinkedQueue<ServiceInfo>>>>() {
            });

            log.info("{} 获取拉取一次当前服务列表信息 {} 时间是 {}", this.getClass(), msg, DateUtil.now());

            this.serviceList = services;

            start.complete(services);
        };

        // 发现服务流程
        mqttAsyncClient.subscribe(CoreTask.topicServicesPull, 2, this, servicesPullAction, servicesPullListener);

        // 执行请求结构流程
        start.thenAcceptAsync(services -> {
            try {

                RequestInfo requestInfo = createRequestInfo();

                // 检查参数完整性
                if (!Objects.isNull(doCheckRequestInfo(requestInfo))) {
                    end.completeExceptionally(new Throwable("请求参数不完整"));
                    return;
                }

                // 检查服务有效性
                RequestException re = doCheckService(requestInfo);
                if (!Objects.isNull(re)) {
                    end.completeExceptionally(new Throwable(re.getMessage()));
                    return;
                }

                IMqttActionListener postRequestAction = new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        log.info("{} 发送请求成功 {} 时间是 {}", this.getClass(), requestInfo.getRequestTopic(), DateUtil.now());
                        doPostRequestSuccess.complete(asyncActionToken);
                    }

                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        log.error("{} 发送请求失败 {} 时间是 {} 原因是 {}", this.getClass(), requestInfo.getRequestTopic(), DateUtil.now(), exception.getMessage());
                        doPostRequestError.complete(new MqttFailureHolder(asyncActionToken, exception));
                    }
                };

                String requestStr = objectMapper.writeValueAsString(requestInfo);
                mqttAsyncClient.publish(requestInfo.getRequestTopic(), requestStr.getBytes(), 2, false, this, postRequestAction);

                // 执行结果
                IMqttActionListener responseBeforeAction = new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        log.info("{} 监听结果成功 {} 时间是 {}", this.getClass(), requestInfo.getResponseTopic(), DateUtil.now());
                        doGetResponseBeforeSuccess.complete(asyncActionToken);
                    }

                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        log.error("{} 监听结果失败 {} 时间是 {}", exception.getMessage(), requestInfo.getResponseTopic(), DateUtil.now());
                        doGetResponseBeforeError.complete(new MqttFailureHolder(asyncActionToken, exception));
                    }
                };

                // 执行结果获取
                IMqttMessageListener responseListener = (topic, message) -> {
                    String msg = new String(message.getPayload());
                    this.responseInfo = objectMapper.readValue(msg, ResponseInfo.class);

                    mqttAsyncClient.unsubscribe(requestInfo.getResponseTopic());
                    end.complete(this.responseInfo);
                };
                mqttAsyncClient.subscribe(requestInfo.getResponseTopic(), 2, this, responseBeforeAction, responseListener);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    /**
     * 构造一个请求信息
     * 如果当前没有服务相关服务信息 那么就无法执行请求
     *
     * @return 请求信息
     */
    protected RequestInfo createRequestInfo() throws RequestException {
        RequestInfo requestInfo = new RequestInfo();

        String requestId = IdUtil.simpleUUID();
        requestInfo.setRequestId(requestId);

        doSetRequestInfo(requestInfo);

        String serviceId = choseService(requestInfo);

        // todo 请求主题生成器 -> 工厂模式
        // todo 结果主题生成器 -> 工厂模式
        String requestTopic = StrUtil.format("{}/{}/{}/{}/{}", CoreTask.topicServices, requestInfo.getName(), serviceId, requestInfo.getMethodName(), requestId);
        String responseTopic = StrUtil.format("{}{}/{}/{}/{}{}/{}", CoreTask.topicPrefixShare, CoreTask.topicServices, requestInfo.getName(), serviceId, requestInfo.getMethodName(), CoreTask.topicResponseExt, requestId);
        requestInfo.setRequestTopic(requestTopic);
        requestInfo.setResponseTopic(responseTopic);

        return requestInfo;
    }

    /**
     * 需要子类覆盖相关行为
     *
     * @param requestInfo 请求信息
     */
    protected void doSetRequestInfo(RequestInfo requestInfo) {

    }

    protected RequestException doCheckRequestInfo(RequestInfo requestInfo) {
        if (StrUtil.isEmpty(requestInfo.getName()) || StrUtil.isEmpty(requestInfo.getMethodName())) {
            return new RequestException("请求参数不完成");
        }

        return null;
    }

    /**
     * 检查服务可用性
     *
     * @param requestInfo 请求信息
     * @return 布尔
     */
    protected RequestException doCheckService(RequestInfo requestInfo) {
        if (!this.serviceList.containsKey(requestInfo.getName())) {
            String errMsg = StrUtil.format("服务列表中没有相关服务 : {}", requestInfo.getName());
            return new RequestException(errMsg);
        }

        if (this.serviceList.get(requestInfo.getName()).get(requestInfo.getMethodName()).stream().noneMatch(serviceInfo -> serviceInfo.getMethods().contains(requestInfo.getMethodName()))) {
            String errMsg = StrUtil.format("服务列表中没有相关功能 : {}", requestInfo.getMethodName());
            return new RequestException(errMsg);
        }

        return null;
    }

    /**
     * 选择出多个服务实例中的一个服务id
     *
     * @param requestInfo 请求信息
     * @return 服务id
     * @throws RequestException 请求异常
     */
    protected String choseService(RequestInfo requestInfo) throws RequestException {
        if (serviceList.size() == 0) {
            String error = StrUtil.format("{} 构造客户端请求失败 原因是客户端服务列表为空 时间是 {}", this.getClass(), DateUtil.now());
            log.error(error);
            throw new RequestException(error);
        }

        ConcurrentHashMap<String, ConcurrentLinkedQueue<ServiceInfo>> methodInstances = serviceList.get(requestInfo.getName());
        if (Objects.isNull(methodInstances) || methodInstances.size() == 0) {
            String error = StrUtil.format("{} 构造客户端请求失败 原因是相关服务模块没有方法实例 时间是 {}", this.getClass(), DateUtil.now());
            log.error(error);
            throw new RequestException(error);
        }

        ConcurrentLinkedQueue<ServiceInfo> instances = methodInstances.get(requestInfo.getMethodName());
        if (Objects.isNull(instances) || instances.size() == 0) {
            String error = StrUtil.format("{} 构造客户端请求失败 原因是相关服务模块没有方法实例 时间是 {}", this.getClass(), DateUtil.now());
            log.error(error);
            throw new RequestException(error);
        }

        ServiceInfo first = instances.peek();

        return first.getServiceId();
    }
}
